package raft

//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
//   start agreement on a new log entry
// rf.GetState() (term, isLeader)
//   ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//

import (
	"bytes"
	"log"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"

	"../labgob"
	"../labrpc"
)

// import "bytes"
// import "../labgob"

//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in Lab 3 you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh; at that point you can add fields to
// ApplyMsg, but set CommandValid to false for these other uses.
//
type ApplyMsg struct {
	CommandValid bool
	Command      interface{}
	CommandIndex int
	CommandTerm  int  // for lab3A, check for lost leadership
	SnapshotData []byte  // for lab3B, update state machine
}

// Role is the server's role
type Role int

const (
	FOLLOWER  Role = 0
	CANDIDATE Role = 1
	LEADER    Role = 2
)

func (s Role) String() string {
	switch s {
	case FOLLOWER:
		return "Follower"
	case CANDIDATE:
		return "Candidate"
	case LEADER:
		return "Leader"
	default:
		panic("unreachable")
	}
}

type LogEntry struct {
	Command interface{}
	Term    int // 这条logEntry是在term时期被写入的
}

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	currentTerm int
	votedFor    int        // vote for the candidate id
	log         []LogEntry // log entries, the first valid index is 1
	startIndex  int  // lab3B: the start index of the log, 1 for begin

	role Role

	commitIndex int // index of log entry of the highest
	lastApplied int

	nextIndex  []int // 对于每个server，需要发送的日志索引
	matchIndex []int // 对每个server，已知的已经被冗余了的日志最高索引

	// channel for communication
	chAppendEntry chan struct{} // 用于通信的管道， receive the heartbeat
	chRequestVote chan struct{} // channel, receive the request vote
	
	// for lab3B
	lastIncludedIndex int   // equals to (startIndex - 1)
	lastIncludedTerm int

	applyCh chan ApplyMsg  // lab3B: for update state machine
}

func (rf *Raft) stepDown(term int) {
	rf.currentTerm = term
	rf.role = FOLLOWER
	rf.votedFor = -1
	rf.persist()
}

func init() {
	rand.Seed(time.Now().UnixNano())
	// labgob.Register(struct{}{})
}

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

	var term int
	var isleader bool
	// Your code here (2A).
	rf.mu.Lock()
	term = rf.currentTerm
	isleader = (rf.role == LEADER)
	rf.mu.Unlock()
	return term, isleader
}

//
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
//
func (rf *Raft) persist() {
	// Your code here (2C).
	data := rf.encodeRaftState()
	rf.persister.SaveRaftState(data)
	// DPrintf("Server %d: Persist state term = %d, votedFor=%d, len(log)=%d", rf.me, rf.currentTerm, rf.votedFor, len(rf.log))
}

func (rf *Raft) encodeRaftState() []byte {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	if err := e.Encode(rf.currentTerm); err != nil {
		log.Fatal("encode currentterm", err)
	}
	if err := e.Encode(rf.votedFor); err != nil {
		log.Fatal("encode votefor err:", err)
	}
	if err := e.Encode(rf.log); err != nil {
		log.Fatalln(err)
	}
	if err := e.Encode(rf.startIndex); err != nil {
		log.Fatalln("encode startIndex err:", err)
	}
	data := w.Bytes()
	return data
}

//
// restore previously persisted state.
//
func (rf *Raft) readPersist(data []byte) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	rf.role = FOLLOWER
	rf.nextIndex = make([]int, len(rf.peers))
	rf.matchIndex = make([]int, len(rf.peers))

	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	// Your code here (2C).
	// Example:
	r := bytes.NewBuffer(data)
	decoder := labgob.NewDecoder(r)

	if err := decoder.Decode(&rf.currentTerm); err != nil {
		log.Fatalln(err)
	}
	if err := decoder.Decode(&rf.votedFor); err != nil {
		log.Fatalln(err)
	}
	if err := decoder.Decode(&rf.log); err != nil {
		log.Fatalln(err)
	}
	if err := decoder.Decode(&rf.startIndex); err != nil {
		log.Fatalln(err)
	}
	if rf.votedFor < 0 {
		rf.votedFor = -1
	}
	RPrintf(rf, "readPersist currentTerm=%d votedFor=%d log=%v startIndex=%d\n", rf.currentTerm, rf.votedFor, rf.log, rf.startIndex)
}

// AppendEntriesArgs RPC arguments structure
// As leader, this is stand for the heartbeat with no logentries
type AppendEntriesArgs struct {
	Term         int        // leader’s term
	LeaderID     int        // so follower can redirect clients
	PrevLogIndex int        // index of log entry immediately preceding new ones
	PrevLogTerm  int        // term of prevLogIndex entry
	Entries      []LogEntry // log entries to store (empty for heartbeat; may send more than one for efficiency)
	LeaderCommit int        // leader’s commitIndex
}

// AppendEntriesReply RPC arguments structure
type AppendEntriesReply struct {
	Term    int
	Success bool

	ConflictIndex int // 收到leader发来的replicate log请求后，prevLogIndex不匹配
}

func (rf *Raft) sendAppendEntry(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
	ok := rf.peers[server].Call("Raft.AppendEntry", args, reply)
	return ok
}

// AppendEntry RPC receiver handler
func (rf *Raft) AppendEntry(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if args.Term > rf.currentTerm {
		RPrintf(rf, "receive high term AE, step down.")
		rf.stepDown(args.Term)
	}
	reply.Term = rf.currentTerm
	if args.Term < rf.currentTerm {
		RPrintf(rf, "invalid AppendEntry RPC, args.Term=%d", args.Term)
		reply.Success = false
		reply.ConflictIndex = -1
		return
	} else if args.PrevLogIndex >= rf.getLogTailIndex() || rf.logAt(args.PrevLogIndex).Term != args.PrevLogTerm {
		// DPrintf("Follower %d: args.PrevLogIndex=%d, len(rf.log)=%d", rf.me, args.PrevLogIndex, len(rf.log))
		reply.Success = false

		if args.PrevLogIndex >= rf.getLogTailIndex() {
			reply.ConflictIndex = rf.getLogTailIndex()
		} else {
			// follower 的日志较长，且有冲突
			reply.ConflictIndex = args.PrevLogIndex
			conflictTerm := rf.logAt(reply.ConflictIndex).Term
			// find the first index it store in that term
			for i := reply.ConflictIndex - 1; i >= rf.startIndex; i-- {  // lab3B 注意这里改成startIndex
				if rf.logAt(i).Term != conflictTerm {
					break
				} else {
					reply.ConflictIndex = i
				}
			}
		}
		RPrintf(rf, "Reply false AppendEntries: PrevLogIndex %d dismatch. ConflictIndex=%d\n", args.PrevLogIndex, reply.ConflictIndex)
	} else {
		// DPrintf("Follower %d: Got valid LogEntries, length=%d\n", rf.me, len(args.Entries))
		rf.currentTerm = args.Term
		reply.Success = true
		reply.ConflictIndex = -1
		reply.Term = rf.currentTerm // Don't forget the server's term

		matchIndex := args.PrevLogIndex + 1
		for i, v := range args.Entries {
			if matchIndex+i >= rf.getLogTailIndex() || rf.logAt(matchIndex+i).Term != v.Term {
				rf.log = rf.log[:rf.convLogIndex(matchIndex+i)]               // delete all following logs from conflict
				rf.log = append(rf.log, args.Entries[i:]...) // append new logs
				break
			}
		}
		if len(args.Entries) > 0 {
			rf.persist()
			RPrintf(rf, "Append log, new lastLogIndex = %d\n", len(rf.log)-1)
		}

		if args.LeaderCommit > rf.commitIndex {
			rf.commitIndex = min(rf.getLogTailIndex()-1, args.LeaderCommit) // Updating commitIndex
			RPrintf(rf, "Update commitIndex = %d\n", rf.commitIndex)
		}
	}
	// 除了term小于currentTerm的情况，都算是接收到了心跳
	go func() {
		rf.chAppendEntry <- struct{}{}
	}()
}

//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {
	// Your data here (2A, 2B).
	Term         int // candidate's term
	CandidateID  int // 暂时用rafts数组的下标作为id
	LastLogIndex int // 最后一个日志条目的下标
	LastLogTerm  int
}

//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
	// Your data here (2A).
	Term        int  // currentTerm, for candidate to update itself
	VoteGranted bool // true表示同意选举
}

//
// example RequestVote RPC handler.
// args保存了candidate的信息
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	RPrintf(rf, "Receive RV from %d", args.CandidateID)
	if args.Term > rf.currentTerm { // 当处于leader阶段的时候收到了requestVote
		RPrintf(rf, "Receive high term RV, step down.")
		rf.stepDown(args.Term)
	}
	reply.Term = rf.currentTerm

	if args.Term < rf.currentTerm {
		reply.VoteGranted = false
		RPrintf(rf, "Refuse vote to %d because the args low term=%d\n", args.CandidateID, args.Term)
	} else if rf.votedFor == -1 || rf.votedFor == args.CandidateID {
		curLogIndex := rf.getLogTailIndex() - 1
		if rf.logAt(curLogIndex).Term < args.LastLogTerm || (args.LastLogTerm == rf.logAt(curLogIndex).Term && args.LastLogIndex >= curLogIndex) { // candidates的log至少和我一样新
			RPrintf(rf, "Vote to %d\n", args.CandidateID)
			reply.VoteGranted = true
			rf.votedFor = args.CandidateID

			rf.persist()

			go func() {
				rf.chRequestVote <- struct{}{}
			}() // 通知自己收到了合法的请求投票
		} else {
			reply.VoteGranted = false
			RPrintf(rf, "Refuse vote to %d, because the old log\n", args.CandidateID)
		}
	} else {
		reply.VoteGranted = false
		RPrintf(rf, "Refuse vote to %d, because has voted->%d.\n", args.CandidateID, rf.votedFor)
	}
}

//
// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// The labrpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost.
// Call() sends a request and waits for a reply. If a reply arrives
// within a timeout interval, Call() returns true; otherwise
// Call() returns false. Thus Call() may not return for a while.
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply.
//
// Call() is guaranteed to return (perhaps after a delay) *except* if the
// handler function on the server side does not return.  Thus there
// is no need to implement your own timeouts around Call().
//
// look at the comments in ../labrpc/labrpc.go for more details.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
//
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
	return ok
}

//
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
//
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	isLeader := true

	rf.mu.Lock()
	defer rf.mu.Unlock()

	term := rf.currentTerm
	// Your code here (2B).
	if rf.role == LEADER {
		index = rf.getLogTailIndex() // the index that the command will appear at (start at 1)
		logEntry := LogEntry{command, rf.currentTerm}
		rf.log = append(rf.log, logEntry)
		RPrintf(rf, "Receive command. Index = %d cmd = %v\n", index, command)

		rf.persist()
	} else {
		isLeader = false
	}

	return index, term, isLeader
}

//
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {
	atomic.StoreInt32(&rf.dead, 1)
	// Your code here, if desired.
}

func (rf *Raft) killed() bool {
	z := atomic.LoadInt32(&rf.dead)
	return z == 1
}

func (rf *Raft) changeRole(role Role) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	rf.role = role
	rf.votedFor = -1
}

// leader sending heartbeat and args settings.
func (rf *Raft) sendingHeartBeat() {
	rf.mu.Lock()
	lastLogIndex := rf.getLogTailIndex() - 1
	leaderCurrentTerm := rf.currentTerm // 记录成为leader时的Term，防止被后面的篡改，有些RPC会更改leader的term
	rf.mu.Unlock()
	for i := range rf.peers {
		rf.nextIndex[i] = lastLogIndex + 1
		rf.matchIndex[i] = 0 // index of highest log entry known to be replicated on server
	}

	timer := time.NewTimer(time.Duration(time.Millisecond * 200)) // 间隔200ms发出心跳

	// 定期检查是否有可以Commit的日志
	go rf.logCommitter(leaderCurrentTerm)

	for {
		rf.mu.Lock()
		if rf.role != LEADER { // 当没有因为收到
			RPrintf(rf, "Got High Term RPC, no longer a leader")
			rf.mu.Unlock()
			return
		}
		rf.mu.Unlock()
		for serverID := range rf.peers {
			if serverID == rf.me {
				continue
			}
			go func(server int) {
				rf.mu.Lock()
				args := AppendEntriesArgs{}
				args.Term = leaderCurrentTerm
				args.LeaderID = rf.me
				args.PrevLogIndex = rf.nextIndex[server] - 1 // index of log entry immediately preceding new ones
				args.PrevLogTerm = rf.logAt(args.PrevLogIndex).Term
				args.Entries = []LogEntry{} // empty log entries
				args.LeaderCommit = rf.commitIndex
				reply := AppendEntriesReply{}
				
				RPrintf(rf, "collect log entris, nextIndex[%d] = %d, rf.startIndex=%d", server, rf.nextIndex[server], rf.startIndex)
				args.Entries = append(args.Entries, rf.log[rf.convLogIndex(rf.nextIndex[server]):]...)
				rf.mu.Unlock()

				if ok := rf.sendAppendEntry(server, &args, &reply); ok {
					// RPrintf(rf, "Got response from follower %d", server)
					rf.mu.Lock()
					// defer rf.mu.Unlock()
					// 如果还是leader阶段且收到了高term的回复
					if leaderCurrentTerm < reply.Term && rf.role == LEADER {
						rf.stepDown(reply.Term)
						// rf.chStepDown <- struct{}{}
						RPrintf(rf, "Saw term %d, step down\n", reply.Term)
						rf.mu.Unlock()
						return
					} else if rf.role != LEADER {
						RPrintf(rf, "Not leader anymore\n")
						rf.mu.Unlock()
						return
					}
					if len(args.Entries) == 0 {
						// 心跳回复的处理
						rf.mu.Unlock()
						return
					}

					if reply.Success {
						// rf.matchIndex[server] = rf.nextIndex[server]
						highReplicatedIndex := args.PrevLogIndex + len(args.Entries)
						rf.nextIndex[server] = highReplicatedIndex + 1
						rf.matchIndex[server] = highReplicatedIndex
						RPrintf(rf, "Update Server %d matchIndex = %d, nextIndex = %d\n", server, rf.matchIndex[server], rf.nextIndex[server])
						rf.mu.Unlock()
					} else {
						if reply.ConflictIndex < rf.startIndex {
							// lab3B the follower need new snapshots
							RPrintf(rf, "reply.ConflictIndex %d, rf.startIndex %d\n", reply.ConflictIndex, rf.startIndex)
							RPrintf(rf, "send InstallSnapshot to follower %d\n", server)
							// rf.nextIndex[server] = rf.startIndex
							rf.mu.Unlock()
							rf.leaderSendInstallSnapshot(server)
						} else {
							rf.nextIndex[server] = reply.ConflictIndex
							RPrintf(rf, "decrease Follower %d's nextIndex to %d\n", server, reply.ConflictIndex)
							rf.mu.Unlock()
						}
					}
				}
			}(serverID)
		}

		select {
		case <-timer.C:
			timer.Reset(time.Duration(time.Millisecond * 200))
		// case <-rf.chStepDown: // discover a server with higher term
		// 	return
		case <-rf.chAppendEntry: // 如果接收到follower的高term AppendEntry
			rf.changeRole(FOLLOWER)
			return
		case <-rf.chRequestVote: // 如果接收到candidate的高term RequestVote
			rf.changeRole(FOLLOWER)
			return
		}
	}
}

// Executed by Leader periodically
func (rf *Raft) logCommitter(term int) {
	rf.mu.Lock()
	stale := rf.role != LEADER || term != rf.currentTerm
	npeers := len(rf.peers)
	rf.mu.Unlock()

	for ; !rf.killed() && !stale; time.Sleep(GUARDTIMEOUT) { // 不断检查matchIndex
		rf.mu.Lock()
		stale := rf.role != LEADER || term != rf.currentTerm
		if !stale {
			for n := rf.commitIndex + 1; n < rf.getLogTailIndex(); n++ {
				if c := rf.logAt(n); c.Term == rf.currentTerm { // Log Matching property
					for p, replicas := 0, 1; p < npeers; p++ {
						if p == rf.me {
							continue
						}
						if rf.matchIndex[p] >= n {
							replicas += 1
						}
						if replicas > npeers/2 {
							rf.commitIndex = n
							RPrintf(rf, "Log %d replicated.\n", n)
							// DPrintf("Leader %d: Log %d replicated.\n", rf.me, n)
							break
						}
					}
				}
			}
		}
		rf.mu.Unlock()
	}
}

// Executed by every servers periodically
func (rf *Raft) logApplier(applyCh chan ApplyMsg) {
	for ; !rf.killed(); time.Sleep(GUARDTIMEOUT) {
		rf.mu.Lock()
		for rf.commitIndex > rf.lastApplied {
			apply := rf.lastApplied + 1
			RPrintf(rf, "Applied log[%d]=%v to state machine\n", apply, rf.logAt(apply).Command)
			applyCh <- ApplyMsg{true, rf.logAt(apply).Command, apply, rf.logAt(apply).Term, []byte{}}
			rf.lastApplied = apply
		}
		rf.mu.Unlock()
	}
}

func (rf *Raft) election() {
	rf.mu.Lock()
	rf.currentTerm++
	rf.votedFor = rf.me

	args := RequestVoteArgs{}
	args.CandidateID = rf.me
	args.Term = rf.currentTerm
	args.LastLogIndex = rf.getLogTailIndex() - 1
	args.LastLogTerm = rf.logAt(args.LastLogIndex).Term

	candidateTerm := rf.currentTerm

	rf.persist()

	rf.mu.Unlock()

	voteCount := 0
	voteOk := make(chan struct{})
	var notified bool = false // 是否已经发送给voteOk信号

	stepDownCh := make(chan struct{})

	timer := time.NewTimer(time.Millisecond * time.Duration(rand.Intn(300)+500))
	for serverID := range rf.peers {
		if serverID != rf.me {
			go func(server int) {
				reply := RequestVoteReply{}
				if ok := rf.sendRequestVote(server, &args, &reply); ok {
					rf.mu.Lock()
					defer rf.mu.Unlock()
					if rf.currentTerm != candidateTerm || rf.role != CANDIDATE {
						return
					}
					if reply.VoteGranted {
						voteCount++
						if !notified && voteCount >= len(rf.peers)/2 {
							voteOk <- struct{}{}
							notified = true // 其他的选票就不再通知了
						}
					} else if reply.Term > candidateTerm {
						if rf.role == CANDIDATE {
							rf.stepDown(reply.Term)
							stepDownCh <- struct{}{}
						}
					}
				}
			}(serverID)
		}
	}
	select {
	case <-stepDownCh:
		return
	case <-voteOk:
		RPrintf(rf, "Become Leader")
		rf.changeRole(LEADER)
		return
	case <-rf.chAppendEntry: // 当有其他的leader已经被选举出来后
		RPrintf(rf, "Receive leader's heartbeat")
		rf.changeRole(FOLLOWER)
		return
	case <-rf.chRequestVote: // 收到其他candidate的合法选票
		rf.changeRole(FOLLOWER)
		RPrintf(rf, "Receive others' requestVote")
		return
	case <-timer.C: // 到时间了还没有选票
		rf.changeRole(FOLLOWER)
		return
	}
}

func (rf *Raft) following() {
	// rand.Seed(time.Now().Unix()) // now use the init() function
	electionTimeout := rand.Intn(1000) + 400 // in the thesis use 150~300ms, but here use 500~1000ms
	timer := time.NewTimer(time.Millisecond * time.Duration(electionTimeout))
	for {
		if rf.killed() {
			return
		}
		select {
		case <-timer.C:
			RPrintf(rf, "Eletion Timeout, start election\n")
			rf.changeRole(CANDIDATE)
			return
		case <-rf.chAppendEntry: // receive the heartbeat
			electionTimeout := rand.Intn(1000) + 400
			timer.Reset(time.Millisecond * time.Duration(electionTimeout))
		case <-rf.chRequestVote:
			RPrintf(rf, "Recieve the candidates' request for vote\n")
			electionTimeout := rand.Intn(1000) + 400
			timer.Reset(time.Millisecond * time.Duration(electionTimeout))
		}
	}
}

func (rf *Raft) startServing() {
	for {
		if rf.killed() {
			return
		}
		rf.mu.Lock()

		switch rf.role {
		case FOLLOWER:
			rf.mu.Unlock()
			rf.following()
		case CANDIDATE:
			rf.mu.Unlock()
			rf.election()
		case LEADER:
			rf.mu.Unlock()
			rf.sendingHeartBeat()
		}
	}
}

//
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me

	// Your initialization code here (2A, 2B, 2C).
	rf.currentTerm = 0
	rf.votedFor = -1
	rf.log = make([]LogEntry, 0, 16)
	// rf.log = append(rf.log, LogEntry{"", -1})
	rf.commitIndex = 0
	rf.lastApplied = 0
	rf.matchIndex = make([]int, len(rf.peers))
	rf.nextIndex = make([]int, len(rf.peers))

	rf.chAppendEntry = make(chan struct{})
	rf.chRequestVote = make(chan struct{})

	rf.role = FOLLOWER
	
	rf.applyCh = applyCh
	// lab3B, read snapshot, define the log start index
	rf.startIndex = 1
	
	if persister.SnapshotSize() > 0 {
		RPrintf(rf, "Reading snapshot")
		snapshot := DecodeSnapshot(persister.ReadSnapshot())
		rf.lastIncludedIndex = snapshot.LastIncludedIndex
		rf.lastIncludedTerm = snapshot.LastIncludedTerm
		// rf.startIndex = rf.lastIncludedIndex + 1
	} else {
		rf.lastIncludedIndex = 0
		rf.lastIncludedTerm = -1
		// rf.startIndex = 1
	}
	// 开启服务
	go rf.startServing()
	go rf.logApplier(applyCh) // 开启状态机应用协程

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	return rf
}

// for lab3B
type Snapshot struct {
	LastIncludedIndex int
	LastIncludedTerm int
	Storage map[string]string
	Acked map[int64]int64
}

func encodeSnapshot(snapshot *Snapshot) []byte {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	if err := e.Encode(snapshot.LastIncludedIndex); err != nil {
		log.Fatal("encode lastIncludeIndex err:", err)
	}
	if err := e.Encode(snapshot.LastIncludedTerm); err != nil {
		log.Fatal("encode lastIncludeTerm err:", err)
	}
	if err := e.Encode(snapshot.Storage); err != nil {
		log.Fatalln(err)
	}
	if err := e.Encode(snapshot.Acked); err != nil {
		log.Fatalln(err)
	}
	data := w.Bytes()
	return data
}

func DecodeSnapshot(data []byte) *Snapshot {
	r := bytes.NewBuffer(data)
	decoder := labgob.NewDecoder(r)

	snapshot := new(Snapshot)

	if err := decoder.Decode(&snapshot.LastIncludedIndex); err != nil {
		log.Fatalln(err)
	}
	if err := decoder.Decode(&snapshot.LastIncludedTerm); err != nil {
		log.Fatalln(err)
	}
	if err := decoder.Decode(&snapshot.Storage); err != nil {
		log.Fatalln("Error decoding Storage", err)
	}
	if err := decoder.Decode(&snapshot.Acked); err != nil {
		log.Fatalln("Error decoding Acked", err)
	}

	return snapshot
}

// todo: test
// interface between the kvserver and raft
func (rf *Raft) ExceededMaxRaftState(maxraftstate int) bool {
	return rf.persister.RaftStateSize() > maxraftstate;
}

func (rf *Raft) BuildInstallSnapshot(storage map[string]string, acked map[int64]int64) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	snapshot := Snapshot{LastIncludedIndex: rf.lastApplied, 
						LastIncludedTerm: rf.logAt(rf.lastApplied).Term, 
						Storage: storage, Acked: acked}
	data := encodeSnapshot(&snapshot)
	rf.trimLog(rf.lastApplied)
	rf.lastIncludedIndex = snapshot.LastIncludedIndex
	rf.lastIncludedTerm = snapshot.LastIncludedTerm
	rf.lastApplied = max(rf.lastApplied, rf.lastIncludedIndex) // 更新lastApplied
	rf.commitIndex = max(rf.commitIndex, rf.lastIncludedIndex) // 更新commitIndex
	rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(), data)  // save state and snapshot
	// rf.persist() // save state
}

// for managing log
func (rf *Raft) logAt(index int) *LogEntry {
	if index < rf.startIndex {
		return &LogEntry{"", rf.lastIncludedTerm}
	}
	index -= rf.startIndex
	return &rf.log[index]
}

// convert logical index to real index
func (rf *Raft) convLogIndex(index int) int {
	if index < rf.startIndex {
		return 0  // when followers request this log index, leader should send InstallSnapshot RPC
	}
	return index - rf.startIndex
}

// return the index of the next index of last log
func (rf *Raft) getLogTailIndex() int {
	return rf.startIndex + len(rf.log)
}

// lastIncludeIndex是snapshot包含的最后一个日志的index
func (rf *Raft) trimLog(lastIncludeIndex int) {
	if lastIncludeIndex <= rf.startIndex {
		return
	}
	rf.log = rf.log[rf.convLogIndex(lastIncludeIndex + 1):]
	rf.startIndex = lastIncludeIndex + 1
}


// for sending snapshots to followers that are too far behind
type InstallSnapshotArgs struct {
	Term int// leader's term
	LeaderID int
	LastIncludedIndex int
	LastIncludedTerm  int
	Data []byte   // raw bytes of snapshots (without partition)
}

type InstallSnapshotReply struct {
	Term int // currentTerm, for leader to update itself
}

func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
	ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
	return ok
}

func (rf *Raft) leaderSendInstallSnapshot(server int) {
	args := InstallSnapshotArgs{}
	reply := InstallSnapshotReply{}
	rf.mu.Lock()
	args.Term = rf.currentTerm
	args.LeaderID = rf.me
	args.LastIncludedIndex = rf.lastIncludedIndex
	args.LastIncludedTerm = rf.lastIncludedTerm
	args.Data = rf.persister.ReadSnapshot()
	leaderCurrentTerm := rf.currentTerm
	rf.mu.Unlock()

	if ok := rf.sendInstallSnapshot(server, &args, &reply); ok {
		rf.mu.Lock()
		defer rf.mu.Unlock()
		// 如果还是leader阶段且收到了高term的回复
		if leaderCurrentTerm < reply.Term && rf.role == LEADER {
			rf.stepDown(reply.Term)
			RPrintf(rf, "Saw term %d, step down\n", reply.Term)
			return
		}
		rf.nextIndex[server] = rf.startIndex
		rf.matchIndex[server] = rf.lastIncludedIndex
	}
}

// followers' InstallSnapshot RPC handler
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
	// reply immediately if the argsTerm < currentTerm
	rf.mu.Lock()
	// defer rf.mu.Unlock()
	reply.Term = rf.currentTerm
	if args.Term < rf.currentTerm || args.LastIncludedIndex < rf.startIndex {
		rf.mu.Unlock()
		return
	}

	// install the snapshot
	// rf.persister.SaveStateAndSnapshot([]byte{}, args.Data)
	rf.lastIncludedIndex = args.LastIncludedIndex
	rf.lastIncludedTerm = args.LastIncludedTerm
	// 更新相应的日志有关状态
	rf.lastApplied = max(rf.lastApplied, rf.lastIncludedIndex)
	rf.commitIndex = max(rf.commitIndex, rf.lastIncludedIndex)
	
	// if log entry has the same index and term as the snapshot last included entry, retain following and return
	if args.LastIncludedIndex >= rf.startIndex && args.LastIncludedIndex < rf.getLogTailIndex() {
		if rf.logAt(args.LastIncludedIndex).Term == args.LastIncludedTerm {
			rf.trimLog(args.LastIncludedIndex)
			rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(), args.Data)
			rf.mu.Unlock()
			// reset the state machine
			rf.applyCh <- ApplyMsg{false, struct{}{}, -1, -1, args.Data}
			return
		}
	}
	// discard entire log
	rf.log = make([]LogEntry, 0, 16)
	rf.startIndex = rf.lastIncludedIndex + 1
	rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(), args.Data)
	rf.mu.Unlock()
	// reset the state machine
	rf.applyCh <- ApplyMsg{false, struct{}{}, -1, -1, args.Data}
}